package com.tony.provider.config;//package com.tony.common.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * 消息发送端
 *
 * @author tony
 * @date 2021-9-15
 */
@Slf4j
@Component
public class MqttPushClient {

    @Autowired
    private MqttClient mqttClient;

    private static volatile MqttPushClient mqttPushClient = null;

    @Bean
    public static MqttPushClient getInstance() throws MqttException {

        if (null == mqttPushClient) {
            synchronized (MqttPushClient.class) {
                if (null == mqttPushClient) {
                    mqttPushClient = new MqttPushClient();
                }
            }
        }
        return mqttPushClient;
    }

    /**
     * 发布主题，用于通知
     * 默认qos为1 非持久化
     *
     * @param topic
     * @param data
     */
    public void publish(String topic, String data) {
        publish(topic, data, 1, false);
    }

    /**
     * 发布
     *
     * @param topic
     * @param data
     * @param qos     对消息处理的几种机制。
     *                0 表示的是订阅者没收到消息不会再次发送，消息会丢失。
     *                1 表示的是会尝试重试，一直到接收到消息，但这种情况可能导致订阅者收到多次重复消息。
     *                2 多了一次去重的动作，确保订阅者收到的消息有一次。
     * @param retained
     */
    public void publish(String topic, String data, int qos, boolean retained) {

        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(data.getBytes());
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        if (null == mqttTopic) {
            log.error("Topic不存在");
        }
        MqttDeliveryToken token;
        try {
            token = mqttTopic.publish(message);
            token.waitForCompletion();
        } catch (Exception e) {
            log.error("发布失败", e);
        }
    }
}
